Skip to content

Conversation

@jakeloo
Copy link
Member

@jakeloo jakeloo commented Aug 29, 2025

Summary by CodeRabbit

  • New Features

    • Staging added as an optional block data source.
  • Performance

    • Poller tuned for speed (50 blocks / 100ms).
    • S3 reads switched to streaming with memory-limited concurrency.
    • Staging block-range lookups cached and refreshed in background.
  • Refactor

    • Unified worker–poller–committer execution model; validator/committer wired to worker/poller.
  • Behavior Changes

    • Failure-recoverer and related flags removed.
    • RPC fetch paths no longer perform automatic retries.
    • Batch/block-range logging improved; some public constructor signatures changed.

@coderabbitai
Copy link

coderabbitai bot commented Aug 29, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

Rearchitects orchestration around a Worker–Poller–Committer model, removes FailureRecoverer and force-from-block flags, migrates Poller/Validator to Worker-backed fetches, removes RPC retry helpers, refactors S3 to streaming reads, adds a Staging source, and replaces staging deletion/last-staged APIs with staging-range queries and caches.

Changes

Cohort / File(s) Summary
CLI wiring and flags
cmd/migrate_valid.go, cmd/root.go, cmd/validate.go, cmd/validate_and_fix.go
Removed force-from-block and failure-recoverer flags/bindings; consolidated logging in migrate_valid.go; validator construction now injects worker.Worker (NewValidator signature changed).
Orchestrator core & lifecycle
internal/orchestrator/orchestrator.go
Rewires orchestrator to compose Worker and Poller, wires optional S3/Staging sources, instantiates Validator with the worker, starts Committer with the poller; removed boolean enablement and FailureRecoverer startup.
Committer
internal/orchestrator/committer.go, internal/orchestrator/committer_test.go
Removed WorkMode gating and related fields/options; NewCommitter now requires *Poller; startup/shutdown and data-fetch paths rewritten to use Poller.Request and integrated sequential data/gap handling; tests updated for new dependency.
Poller
internal/orchestrator/poller.go, internal/orchestrator/poller_test.go
Redesigned to be range-driven and worker-backed: added Request/pollBlockData, range-tracking fields and mutexes, changed defaults (blocks/interval), removed WorkMode/S3 option; tests removed and replaced with placeholders.
Validator
internal/orchestrator/validator.go, cmd/*
NewValidator signature updated to accept w *worker.Worker; validator uses worker.Run for fetches, converts worker results to BlockData, and logging/error paths adjusted.
FailureRecoverer removal
internal/orchestrator/failure_recoverer.go
Entire file removed (type, constructor, Start, constants).
Worker
internal/worker/worker.go
Worker now supports Staging source and rpcChunkSize; new NewWorkerWithSources constructor; batch/chunk processing and retry semantics reworked to return successes and failures; added Close.
RPC batching/fetch
internal/rpc/batcher.go, internal/rpc/rpc.go
Removed retry-capable batch helpers and adaptive 413/backoff logic; GetFullBlocks and other paths now call non-retry RPCFetch* variants — simpler, no adaptive retries.
Sources: S3 refactor & Staging added
internal/source/s3.go, internal/source/staging.go
S3: removed eager block index, added memory semaphore, streaming parquet reads, changed NewS3Source signature; Staging: new StagingSource with GetFullBlocks, GetSupportedBlockRange, and Close.
Storage interfaces and connectors
internal/storage/connector.go, internal/storage/badger.go, internal/storage/clickhouse.go, internal/storage/postgres.go
Replaced staging delete/last-staged APIs with GetStagingDataBlockRange; removed block-failure APIs; Badger adds in-memory range cache and background updater; ClickHouse/Postgres implement range query.
Storage minor edits & tests
internal/storage/block_buffer_badger.go, internal/storage/postgres_connector_test.go
Removed a debug log in badger buffer; tests updated to remove expectations on deleted staging APIs.
Mocks updated
test/mocks/MockIStagingStorage.go
Mock adapted from BlockFailure-centric API to BlockData-centric: GetBlockFailuresGetStagingData, added GetStagingDataBlockRange, removed block-failure/delete mocks and helpers.
Other test placeholders
internal/orchestrator/poller_test.go, internal/orchestrator/reorg_handler_test.go
Heavily reduced/placeholder tests (skipped) indicating tests must be rewritten for the new implementations.
Batch RPC helpers removed
internal/rpc/batcher.go
Deleted retry helpers: RPCFetchInBatchesWithRetry, RPCFetchSingleBatchWithRetry, and hasBatchError.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant CLI as CLI/Service
  participant Orch as Orchestrator
  participant W as Worker
  participant P as Poller
  participant V as Validator
  participant C as Committer
  participant S as Storage

  CLI->>Orch: Start()
  Orch->>W: Initialize Worker (staging? archive? RPC)
  Orch->>P: NewPoller(WithPollerWorker=W)
  Orch->>V: NewValidator(rpc, storage, W)
  Orch->>C: NewCommitter(rpc, storage, P, validator=V)
  par Run components
    Orch->>P: Start()
    Orch->>C: Start()
  end
  loop Committer loop
    C->>P: Request(ctx, blockRange)
    P->>W: Run(ctx, blocks)
    W->>S: GetFullBlocks (staging→archive→rpc)
    W-->>P: results (successes + failed)
    P->>Storage: stageResults(blockData)
    C->>Storage: commit main storage
  end
Loading
sequenceDiagram
  autonumber
  participant W as Worker
  participant SS as StagingSource
  participant AS as S3/Archive
  participant RPC as RPC

  W->>SS: GetFullBlocks(blocks) [if configured]
  SS-->>W: results (found/missing)
  alt missing
    W->>AS: GetFullBlocks(missing) [if configured]
    AS-->>W: results
  end
  alt still missing
    W->>RPC: GetFullBlocks(missing)
    RPC-->>W: results or errors
  end
  W-->>Caller: combined ordered results + failed set
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • Fix committer #276 — Mirrors major signature and architecture changes (NewValidator now accepts *worker.Worker, orchestrator/poller/committer/worker refactors) and appears directly related.

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between b54cf9a and aa8a6ee.

📒 Files selected for processing (1)
  • internal/orchestrator/poller.go (6 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jl/fix-committer

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbit in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbit in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbit gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbit read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbit help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbit ignore or @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbit summary or @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbit or @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
internal/storage/badger.go (1)

375-414: Race condition in chainRanges map update logic.

The logic for updating chainRanges has redundant comparisons and creates unnecessary struct copies. The min/max comparison logic could be simplified.

 // Track min/max for this chain
 chainStr := blockData.Block.ChainId.String()
-if r, exists := chainRanges[chainStr]; exists {
-    if blockData.Block.Number.Cmp(r.min) < 0 {
-        chainRanges[chainStr] = struct {
-            min *big.Int
-            max *big.Int
-        }{blockData.Block.Number, r.max}
-    }
-    if blockData.Block.Number.Cmp(r.max) > 0 {
-        chainRanges[chainStr] = struct {
-            min *big.Int
-            max *big.Int
-        }{r.min, blockData.Block.Number}
-    }
-} else {
-    chainRanges[chainStr] = struct {
-        min *big.Int
-        max *big.Int
-    }{blockData.Block.Number, blockData.Block.Number}
-}
+r, exists := chainRanges[chainStr]
+if !exists {
+    r = struct {
+        min *big.Int
+        max *big.Int
+    }{blockData.Block.Number, blockData.Block.Number}
+} else {
+    if blockData.Block.Number.Cmp(r.min) < 0 {
+        r.min = blockData.Block.Number
+    }
+    if blockData.Block.Number.Cmp(r.max) > 0 {
+        r.max = blockData.Block.Number
+    }
+}
+chainRanges[chainStr] = r
internal/storage/connector.go (1)

76-97: Defensive Close: guard against nil storages

Close() assumes all storages are non-nil. Add nil checks to avoid panics if any connector failed to initialize or is optional in other setups.

 func (s *IStorage) Close() error {
   var errs []error

   // Close each storage that implements Closer interface
-  if err := s.OrchestratorStorage.Close(); err != nil {
+  if s.OrchestratorStorage != nil {
+    if err := s.OrchestratorStorage.Close(); err != nil {
       errs = append(errs, fmt.Errorf("failed to close orchestrator storage: %w", err))
-  }
+    }
+  }

-  if err := s.MainStorage.Close(); err != nil {
+  if s.MainStorage != nil {
+    if err := s.MainStorage.Close(); err != nil {
       errs = append(errs, fmt.Errorf("failed to close main storage: %w", err))
-  }
+    }
+  }

-  if err := s.StagingStorage.Close(); err != nil {
+  if s.StagingStorage != nil {
+    if err := s.StagingStorage.Close(); err != nil {
       errs = append(errs, fmt.Errorf("failed to close staging storage: %w", err))
-  }
+    }
+  }
internal/worker/worker.go (2)

82-203: Retry path misclassifies failures as successes and drops error details

  • Error results are inserted into successMap (single-retry branch and recursive branches), causing errored blocks to be returned as “success”.
  • Recursive retries discard specific error details from subcalls.

Fix by:

  • Only storing nil-error results in successMap.
  • Accumulating failed results (with their original errors).
  • Preserving errors when building final results.
 func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, fetchFunc func(context.Context, []*big.Int) []rpc.GetFullBlockResult) (success []rpc.GetFullBlockResult, failed []rpc.GetFullBlockResult) {
@@
-  // Separate successful and failed
-  successMap := make(map[string]rpc.GetFullBlockResult)
-  var failedBlocks []*big.Int
+  // Separate successful and failed
+  successMap := make(map[string]rpc.GetFullBlockResult)
+  failedMap := make(map[string]rpc.GetFullBlockResult)
+  var failedBlocks []*big.Int
@@
-      if result.Error == nil {
-        successMap[chunk[i].String()] = result
-      } else {
-        failedBlocks = append(failedBlocks, chunk[i])
-      }
+      if result.Error == nil {
+        successMap[chunk[i].String()] = result
+      } else {
+        failedBlocks = append(failedBlocks, chunk[i])
+        failedMap[chunk[i].String()] = result
+      }
@@
-  if len(failedBlocks) == 1 {
-    retryResults := fetchFunc(ctx, failedBlocks)
-    if len(retryResults) > 0 {
-      if retryResults[0].Error == nil {
-        successMap[failedBlocks[0].String()] = retryResults[0]
-      } else {
-        // Keep the error result
-        successMap[failedBlocks[0].String()] = rpc.GetFullBlockResult{
-          BlockNumber: failedBlocks[0],
-          Error:       retryResults[0].Error,
-        }
-      }
-    }
+  if len(failedBlocks) == 1 {
+    retryResults := fetchFunc(ctx, failedBlocks)
+    if len(retryResults) > 0 {
+      if retryResults[0].Error == nil {
+        successMap[failedBlocks[0].String()] = retryResults[0]
+        delete(failedMap, failedBlocks[0].String())
+      } else {
+        // Preserve the specific error
+        failedMap[failedBlocks[0].String()] = retryResults[0]
+      }
+    }
   } else if len(failedBlocks) > 1 {
@@
-    var rwg sync.WaitGroup
-    var rwgMutex sync.Mutex
+    var rwg sync.WaitGroup
+    var mu sync.Mutex
@@
-      leftResults, _ := w.processChunkWithRetry(ctx, leftChunk, fetchFunc)
-      // Add results to map
-      for _, r := range leftResults {
-        if r.BlockNumber != nil {
-          rwgMutex.Lock()
-          successMap[r.BlockNumber.String()] = r
-          rwgMutex.Unlock()
-        }
-      }
+      lsucc, lfail := w.processChunkWithRetry(ctx, leftChunk, fetchFunc)
+      mu.Lock()
+      for _, r := range lsucc {
+        if r.BlockNumber != nil {
+          successMap[r.BlockNumber.String()] = r
+          delete(failedMap, r.BlockNumber.String())
+        }
+      }
+      for _, r := range lfail {
+        if r.BlockNumber != nil {
+          failedMap[r.BlockNumber.String()] = r
+        }
+      }
+      mu.Unlock()
     }()
@@
-      rightResults, _ := w.processChunkWithRetry(ctx, rightChunk, fetchFunc)
-      // Add results to map
-      for _, r := range rightResults {
-        if r.BlockNumber != nil {
-          rwgMutex.Lock()
-          successMap[r.BlockNumber.String()] = r
-          rwgMutex.Unlock()
-        }
-      }
+      rsucc, rfail := w.processChunkWithRetry(ctx, rightChunk, fetchFunc)
+      mu.Lock()
+      for _, r := range rsucc {
+        if r.BlockNumber != nil {
+          successMap[r.BlockNumber.String()] = r
+          delete(failedMap, r.BlockNumber.String())
+        }
+      }
+      for _, r := range rfail {
+        if r.BlockNumber != nil {
+          failedMap[r.BlockNumber.String()] = r
+        }
+      }
+      mu.Unlock()
     }()
@@
-  // Build final results in original order
-  var finalResults []rpc.GetFullBlockResult
-  var failedResults []rpc.GetFullBlockResult
+  // Build final results in original order
+  var finalResults []rpc.GetFullBlockResult
+  var failedResults []rpc.GetFullBlockResult
   for _, block := range chunk {
-    if result, ok := successMap[block.String()]; ok {
+    if result, ok := successMap[block.String()]; ok && result.Error == nil {
       finalResults = append(finalResults, result)
     } else {
-      // This should not happen as we have retried all failed blocks
-      failedResults = append(failedResults, rpc.GetFullBlockResult{
-        BlockNumber: block,
-        Error:       fmt.Errorf("failed to fetch block"),
-      })
+      if fr, ok := failedMap[block.String()]; ok && fr.Error != nil {
+        failedResults = append(failedResults, fr)
+      } else {
+        failedResults = append(failedResults, rpc.GetFullBlockResult{
+          BlockNumber: block,
+          Error:       fmt.Errorf("failed to fetch block"),
+        })
+      }
     }
   }

440-450: Close staging source as well

Worker.Close() only closes archive. Also close staging (if set).

 func (w *Worker) Close() error {
   // Close archive if it exists
   if w.archive != nil {
     log.Debug().Msg("Closing archive connection")
     w.archive.Close()
   }
+  // Close staging if it exists
+  if w.staging != nil {
+    log.Debug().Msg("Closing staging connection")
+    w.staging.Close()
+  }

   log.Debug().Msg("Worker closed successfully")
   return nil
 }
internal/orchestrator/orchestrator.go (1)

115-120: Shutdown: also close worker

Close worker to release any archive/staging resources before closing storage.

   // Waiting for all goroutines to complete
   o.wg.Wait()

+  if o.worker != nil {
+    if err := o.worker.Close(); err != nil {
+      log.Error().Err(err).Msg("Error closing worker")
+    }
+  }
   if err := o.storage.Close(); err != nil {
     log.Error().Err(err).Msg("Error closing storage connections")
   }
internal/orchestrator/committer.go (2)

273-279: Nil guard for StagingStorage during cleanup

If StagingStorage is not configured, this will panic. Guard it.

 func (c *Committer) cleanupProcessedStagingBlocks() {
@@
   chainID := c.rpc.GetChainID()
   blockNumber := new(big.Int).SetUint64(limit)
   stagingDeleteStart := time.Now()
-  if err := c.storage.StagingStorage.DeleteStagingDataOlderThan(chainID, blockNumber); err != nil {
+  if c.storage.StagingStorage == nil {
+    return
+  }
+  if err := c.storage.StagingStorage.DeleteStagingDataOlderThan(chainID, blockNumber); err != nil {
     log.Error().Err(err).Msg("Failed to delete staging data")
     return
   }

288-301: Potential nil dereference when logging latestCommittedBlockNumber

latestCommittedBlockNumber may be nil; calling .String() or .Sign() will panic. Check for nil first.

- latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
- log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
- if err != nil {
+ latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
+ if err != nil {
   return nil, err
 }
+ if latestCommittedBlockNumber != nil {
+   log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
+ } else {
+   log.Debug().Msg("Committer found no blocks in main storage (nil)")
+ }
 
-if latestCommittedBlockNumber.Sign() == 0 {
+if latestCommittedBlockNumber == nil || latestCommittedBlockNumber.Sign() == 0 {
🧹 Nitpick comments (16)
internal/storage/badger.go (2)

428-453: Optimize cache update logic to avoid unnecessary allocations.

The cache update creates new big.Int instances when updating existing ranges, which is unnecessary.

 for chainStr, newRange := range chainRanges {
     existing, exists := bc.rangeCache[chainStr]
     if exists {
         // Update existing range
         if newRange.min.Cmp(existing.min) < 0 {
-            existing.min = newRange.min
+            existing.min = new(big.Int).Set(newRange.min)
         }
         if newRange.max.Cmp(existing.max) > 0 {
-            existing.max = newRange.max
+            existing.max = new(big.Int).Set(newRange.max)
         }
         existing.lastUpdated = time.Now()

669-680: Potential integer overflow when calculating new minimum block.

Adding 1 to blockNumber could theoretically overflow, though unlikely with big.Int.

Consider adding a comment explaining the logic:

 // The new minimum must be blockNumber + 1 or higher
+// Since we deleted all blocks <= blockNumber, the new minimum
+// is the next block if it exists, otherwise the cache entry is removed
 newMin := new(big.Int).Add(blockNumber, big.NewInt(1))
internal/source/s3.go (2)

625-681: Complex nested loop logic in streaming reader needs simplification.

The readRowGroupStreamingly function has deeply nested conditionals and could benefit from early returns.

Consider extracting the row processing logic:

func (s *S3Source) processParquetRow(row parquet.Row, blockMap map[uint64]bool, foundBlocks map[uint64]bool, results map[uint64]rpc.GetFullBlockResult) error {
    if len(row) < 8 {
        return nil // Skip invalid row
    }
    
    blockNum := row[1].Uint64()
    if !blockMap[blockNum] || foundBlocks[blockNum] {
        return nil // Skip unrequested or already found blocks
    }
    
    // Build and parse block data...
}

544-545: Remove outdated comments about block index.

The comments reference block index building which has been removed from the implementation.

-// Don't build block index immediately - build on demand to save memory
-// Block indices will be built lazily when needed
+// File is now ready for streaming reads
internal/storage/clickhouse.go (1)

1887-1910: Return semantics: ensure callers handle (nil, nil) range safely

MIN/MAX always return one row in ClickHouse; you’re correctly handling NULLs by returning (nil, nil). Verify all call sites guard against nil dereferences and treat “no staging data” distinctly from an error. Consider documenting this behavior above the method.

internal/orchestrator/validator.go (1)

23-29: Nil-guard worker dependency

NewValidator requires a non-nil worker. Add a defensive nil-check to fail fast in misconfigured tests/CLI wiring.

 func NewValidator(rpcClient rpc.IRPCClient, s storage.IStorage, w *worker.Worker) *Validator {
+    if w == nil {
+        panic("validator requires non-nil worker")
+    }
     return &Validator{
         rpc:     rpcClient,
         storage: s,
         worker:  w,
     }
 }
cmd/migrate_valid.go (2)

316-316: Reuse a single Worker instance

You construct two Workers (one for validator, one for migrator). Reuse one to share RPC concurrency limits and avoid duplicate semaphores.

-    validator := orchestrator.NewValidator(rpcClient, sourceConnector, worker.NewWorker(rpcClient))
+    w := worker.NewWorker(rpcClient)
+    validator := orchestrator.NewValidator(rpcClient, sourceConnector, w)
@@
     return &Migrator{
         batchSize:   batchSize,
         rpcClient:   rpcClient,
         source:      sourceConnector,
         destination: destinationConnector,
-        validator:   validator,
-        worker:      worker.NewWorker(rpcClient),
+        validator:   validator,
+        worker:      w,
     }

Also applies to: 323-331


235-236: Minor: structured context already present; drop Msgf

You already attach fields via Dur/Int. Prefer Msg with a static message to avoid mixing Msgf formatting with structured fields.

- log.Debug().Dur("duration", mapBuildDuration).Int("missing_blocks", len(missingBlocks)).Msgf("Worker %d: Identified missing blocks", workerID)
+ log.Debug().Dur("duration", mapBuildDuration).Int("missing_blocks", len(missingBlocks)).Int("worker_id", workerID).Msg("Identified missing blocks")
internal/storage/connector.go (1)

111-116: Typos in comments

  • “orchestartor” → “orchestrator”
  • “emphemeral” → “ephemeral”
internal/worker/worker.go (1)

425-435: Logging: incorrect last_block index

Use the last requested block, not results length, to index blockNumbers.

-      Str("first_block", blockNumbers[0].String()).
-      Str("last_block", blockNumbers[len(results)-1].String()).
+      Str("first_block", blockNumbers[0].String()).
+      Str("last_block", blockNumbers[len(blockNumbers)-1].String()).
internal/orchestrator/committer.go (3)

375-387: RPC latest block call: add timeout to avoid long stalls

Bound the RPC call with a short timeout to prevent blocking the commit loop.

-  latestBlock, err := c.rpc.GetLatestBlockNumber(ctx)
+  ctxLatest, cancel := context.WithTimeout(ctx, 3*time.Second)
+  defer cancel()
+  latestBlock, err := c.rpc.GetLatestBlockNumber(ctxLatest)
   if err != nil {
     return nil, fmt.Errorf("error getting latest block from RPC: %v", err)
   }

446-449: Metric should record the first missing block, not the first present block

Set MissedBlockNumbers to expectedBlockNumber (the first missing), not blocksData[0].

-      metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
+      metrics.MissedBlockNumbers.Set(float64(expectedBlockNumber.Int64()))

520-529: Persist lastPublished in default mode after async publish

In default mode, you only update in-memory lastPublishedBlock. Consider persisting to OrchestratorStorage post-publish success for resilience across restarts.

I can provide a small helper to persist the highest block on success if desired.

internal/orchestrator/poller.go (3)

48-56: Add nil check for worker configuration

While there's a nil check for the worker config, it silently returns without any warning. This could lead to confusion if a nil worker is accidentally passed.

Add logging for better debugging:

 func WithPollerWorker(cfg *worker.Worker) PollerOption {
 	return func(p *Poller) {
 		if cfg == nil {
+			log.Warn().Msg("WithPollerWorker called with nil worker, ignoring")
 			return
 		}
 		
 		p.worker = cfg
 	}
 }

315-319: Consider using structured logging more consistently

The debug log uses structured fields which is good, but could be more consistent with field naming.

Use consistent field names:

 log.Debug().
-	Str("last_pending_block", lastPendingFetchBlock.String()).
-	Str("last_polled_block", lastPolledBlock.String()).
-	Str("last_requested_block", lastRequestedBlock.String()).
+	Str("last_pending_fetch_block", lastPendingFetchBlock.String()).
+	Str("last_polled_block", lastPolledBlock.String()).
+	Str("last_requested_block", lastRequestedBlock.String()).
 	Msgf("GetNextBlockRange for poller workers")

239-258: Consider memory optimization for large block ranges

The pollBlockData method could potentially handle large block ranges. The current implementation creates all block data in memory at once.

For very large block ranges, consider implementing streaming or chunked processing to reduce memory footprint. This could be particularly important if blocksPerPoll is set to a large value.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between acdb2eb and 37a24cb.

📒 Files selected for processing (23)
  • cmd/migrate_valid.go (4 hunks)
  • cmd/root.go (0 hunks)
  • cmd/validate.go (2 hunks)
  • cmd/validate_and_fix.go (2 hunks)
  • internal/orchestrator/committer.go (7 hunks)
  • internal/orchestrator/failure_recoverer.go (0 hunks)
  • internal/orchestrator/orchestrator.go (4 hunks)
  • internal/orchestrator/poller.go (6 hunks)
  • internal/orchestrator/poller_test.go (9 hunks)
  • internal/orchestrator/reorg_handler.go (0 hunks)
  • internal/orchestrator/reorg_handler_test.go (0 hunks)
  • internal/orchestrator/validator.go (4 hunks)
  • internal/rpc/batcher.go (0 hunks)
  • internal/rpc/rpc.go (2 hunks)
  • internal/source/s3.go (9 hunks)
  • internal/source/staging.go (1 hunks)
  • internal/storage/badger.go (8 hunks)
  • internal/storage/block_buffer_badger.go (0 hunks)
  • internal/storage/clickhouse.go (1 hunks)
  • internal/storage/connector.go (1 hunks)
  • internal/storage/postgres.go (1 hunks)
  • internal/storage/postgres_connector_test.go (0 hunks)
  • internal/worker/worker.go (10 hunks)
💤 Files with no reviewable changes (7)
  • internal/storage/block_buffer_badger.go
  • internal/storage/postgres_connector_test.go
  • internal/orchestrator/reorg_handler.go
  • internal/orchestrator/reorg_handler_test.go
  • cmd/root.go
  • internal/rpc/batcher.go
  • internal/orchestrator/failure_recoverer.go
🧰 Additional context used
🧬 Code graph analysis (12)
cmd/validate.go (2)
internal/orchestrator/validator.go (1)
  • NewValidator (23-29)
internal/worker/worker.go (1)
  • NewWorker (43-49)
internal/source/staging.go (3)
internal/storage/connector.go (2)
  • IStagingStorage (112-121)
  • QueryFilter (11-29)
internal/rpc/rpc.go (1)
  • GetFullBlockResult (18-22)
internal/common/block.go (1)
  • Block (8-33)
internal/orchestrator/validator.go (4)
internal/storage/connector.go (1)
  • IStorage (69-73)
internal/rpc/rpc.go (2)
  • IRPCClient (42-55)
  • GetFullBlockResult (18-22)
internal/worker/worker.go (1)
  • Worker (35-41)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/storage/badger.go (2)
configs/config.go (1)
  • BadgerConfig (82-84)
internal/common/block.go (1)
  • Block (8-33)
internal/rpc/rpc.go (6)
internal/rpc/batcher.go (2)
  • RPCFetchSingleBatch (53-83)
  • RPCFetchInBatches (19-51)
internal/common/block.go (1)
  • RawBlock (74-74)
internal/rpc/params.go (4)
  • GetBlockWithTransactionsParams (9-11)
  • GetBlockReceiptsParams (29-31)
  • GetLogsParams (21-23)
  • TraceBlockParams (25-27)
internal/common/log.go (2)
  • RawReceipts (73-73)
  • RawLogs (72-72)
configs/config.go (1)
  • Cfg (289-289)
internal/common/trace.go (1)
  • RawTraces (34-34)
cmd/validate_and_fix.go (2)
internal/orchestrator/validator.go (1)
  • NewValidator (23-29)
internal/worker/worker.go (1)
  • NewWorker (43-49)
cmd/migrate_valid.go (2)
internal/orchestrator/validator.go (1)
  • NewValidator (23-29)
internal/worker/worker.go (1)
  • NewWorker (43-49)
internal/worker/worker.go (2)
internal/source/source.go (1)
  • ISource (10-14)
internal/rpc/rpc.go (2)
  • IRPCClient (42-55)
  • GetFullBlockResult (18-22)
internal/source/s3.go (3)
configs/config.go (1)
  • S3SourceConfig (247-255)
internal/rpc/rpc.go (1)
  • GetFullBlockResult (18-22)
internal/storage/s3.go (1)
  • ParquetBlockData (51-60)
internal/orchestrator/orchestrator.go (9)
internal/rpc/rpc.go (1)
  • IRPCClient (42-55)
internal/storage/connector.go (1)
  • IStorage (69-73)
internal/worker/worker.go (2)
  • Worker (35-41)
  • NewWorkerWithSources (52-57)
internal/orchestrator/poller.go (3)
  • Poller (22-39)
  • NewPoller (95-118)
  • WithPollerWorker (48-56)
internal/orchestrator/validator.go (1)
  • NewValidator (23-29)
internal/orchestrator/committer.go (2)
  • NewCommitter (46-86)
  • WithValidator (40-44)
internal/source/source.go (1)
  • ISource (10-14)
internal/source/s3.go (1)
  • NewS3Source (85-173)
internal/source/staging.go (1)
  • NewStagingSource (17-22)
internal/orchestrator/poller.go (6)
internal/rpc/rpc.go (2)
  • IRPCClient (42-55)
  • GetFullBlockResult (18-22)
internal/worker/worker.go (1)
  • Worker (35-41)
internal/storage/connector.go (1)
  • IStorage (69-73)
configs/config.go (1)
  • Cfg (289-289)
internal/metrics/metrics.go (3)
  • PollerLastTriggeredBlock (59-62)
  • PolledBatchSize (52-55)
  • StagingInsertDuration (111-115)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/orchestrator/committer.go (4)
internal/rpc/rpc.go (1)
  • IRPCClient (42-55)
internal/storage/connector.go (1)
  • IStorage (69-73)
internal/orchestrator/poller.go (1)
  • Poller (22-39)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
🔇 Additional comments (12)
internal/storage/badger.go (1)

42-45: Remove manual directory checks BadgerDB’s Open call automatically creates the specified Dir/ValueDir (using os.MkdirAll) and returns an error on failure, so adding your own os.MkdirAll and write-test is redundant.

Likely an incorrect or invalid review comment.

internal/source/s3.go (1)

85-86: Signature change safe — no callers broken. Existing call in orchestrator.go uses the new (chainId, cfg) order and no other references exist; no action required.

cmd/validate.go (1)

62-62: LGTM! Clean integration of Worker pattern.

The change properly integrates the Worker pattern into the validator, maintaining clean separation of concerns.

cmd/validate_and_fix.go (1)

120-120: LGTM! Consistent Worker integration.

The Worker instantiation is consistent with other command implementations.

internal/orchestrator/poller_test.go (1)

1-566: LGTM! Clean removal of ForceFromBlock feature.

The test updates correctly remove all references to the deprecated ForceFromBlock feature while maintaining comprehensive test coverage for the remaining functionality.

internal/storage/postgres.go (1)

407-438: LGTM: robust NULL handling for MIN/MAX on staging range

Correct use of sql.NullString and big.Int parsing; returns (nil, nil) when no data. Behavior aligns with interface and CH implementation.

internal/worker/worker.go (1)

25-27: Staging source type addition looks good

New SourceTypeStaging is consistent with the source abstraction.

internal/orchestrator/orchestrator.go (1)

59-69: Poller/Committer goroutine lifecycle

Good use of context cancellation on component completion to drive a coordinated shutdown.

internal/rpc/rpc.go (2)

254-265: LGTM: batching paths align with new no-retry batcher

The swaps to RPCFetchInBatches are consistent with the architectural change.


241-249: No direct callers of GetFullBlocks outside Worker
Search shows all GetFullBlocks invocations occur in internal/worker/worker.go (and in mocks within tests), so production code routes through Worker’s retry logic.

internal/orchestrator/committer.go (1)

88-107: Publisher position initialization: sensible max(lastPublished, lastCommitted)

Good defensive logic to avoid republishing older offsets.

internal/orchestrator/poller.go (1)

273-288: Good error handling in staging

The staging method properly handles empty data, logs errors, and records metrics. This is a good implementation pattern.

Comment on lines +122 to +144
func (o *Orchestrator) initializeWorkerAndPoller() {
var s3, staging source.ISource
var err error

chainId := o.rpc.GetChainID()
if config.Cfg.Poller.S3.Bucket != "" && config.Cfg.Poller.S3.Region != "" {
s3, err = source.NewS3Source(chainId, config.Cfg.Poller.S3)
if err != nil {
log.Fatal().Err(err).Msg("Error creating S3 source for worker")
return
}
}

if o.storage.StagingStorage != nil {
if staging, err = source.NewStagingSource(chainId, o.storage.StagingStorage); err != nil {
log.Fatal().Err(err).Msg("Error creating Staging source for worker")
return
}
}

o.worker = worker.NewWorkerWithSources(o.rpc, s3, staging)
o.poller = NewPoller(o.rpc, o.storage, WithPollerWorker(o.worker))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don’t exit process on source initialization failures; degrade gracefully

log.Fatal terminates the process. Prefer logging the error and proceeding with remaining sources (RPC/other), so the system still runs.

   if config.Cfg.Poller.S3.Bucket != "" && config.Cfg.Poller.S3.Region != "" {
     s3, err = source.NewS3Source(chainId, config.Cfg.Poller.S3)
     if err != nil {
-      log.Fatal().Err(err).Msg("Error creating S3 source for worker")
-      return
+      log.Error().Err(err).Msg("Error creating S3 source for worker; continuing without S3")
+      s3 = nil
     }
   }
@@
   if o.storage.StagingStorage != nil {
     if staging, err = source.NewStagingSource(chainId, o.storage.StagingStorage); err != nil {
-      log.Fatal().Err(err).Msg("Error creating Staging source for worker")
-      return
+      log.Error().Err(err).Msg("Error creating Staging source for worker; continuing without staging")
+      staging = nil
     }
   }
🤖 Prompt for AI Agents
In internal/orchestrator/orchestrator.go around lines 122-144, replace the
log.Fatal calls used when creating S3 or Staging sources so the process does not
exit: log the error (error or warn) and continue without returning; do not call
os.Exit or log.Fatal. Ensure NewWorkerWithSources can accept nil for s3 or
staging (or construct the worker with only the available sources), and only
fail/return if neither RPC nor any required runtime dependency exists; remove
the early returns so the orchestrator degrades gracefully and initializes the
worker and poller with whatever sources succeeded.

Comment on lines +86 to +88
poller.lastPolledBlock = big.NewInt(0)
poller.lastRequestedBlock = big.NewInt(0)
poller.lastPendingFetchBlock = big.NewInt(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential initialization race condition

The lastPolledBlock, lastRequestedBlock, and lastPendingFetchBlock are initialized to 0 for boundless pollers, but the regular NewPoller might set different values (lines 99-116). If Start() is called before these values are properly initialized in NewPoller, it could lead to incorrect behavior.

These fields should be initialized atomically or the poller should not be startable until fully initialized. Consider adding an initialization flag:

 type Poller struct {
 	chainId                    *big.Int
+	initialized                bool
+	initializedMutex           sync.RWMutex
 	// ... other fields
 }

 func (p *Poller) Start(ctx context.Context) {
+	p.initializedMutex.RLock()
+	if !p.initialized {
+		p.initializedMutex.RUnlock()
+		log.Error().Msg("Poller not initialized")
+		return
+	}
+	p.initializedMutex.RUnlock()
 	// ... rest of Start logic
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
poller.lastPolledBlock = big.NewInt(0)
poller.lastRequestedBlock = big.NewInt(0)
poller.lastPendingFetchBlock = big.NewInt(0)
// internal/orchestrator/poller.go
type Poller struct {
chainId *big.Int
initialized bool
initializedMutex sync.RWMutex
// ... other fields
}
func (p *Poller) Start(ctx context.Context) {
p.initializedMutex.RLock()
if !p.initialized {
p.initializedMutex.RUnlock()
log.Error().Msg("Poller not initialized")
return
}
p.initializedMutex.RUnlock()
// ... rest of Start logic
}
🤖 Prompt for AI Agents
In internal/orchestrator/poller.go around lines 86 to 88, the poller fields
lastPolledBlock, lastRequestedBlock and lastPendingFetchBlock are set to zero
which can race with NewPoller assigning other initial values; ensure
initialization is atomic by introducing an initialized flag (or use sync.Once)
plus a mutex: set the three big.Int fields and flip the initialized flag while
holding the lock in NewPoller, and have Start() check the initialized flag (or
wait on the Once) under the same lock before proceeding so Start cannot run
until full initialization is complete.

Comment on lines +101 to 109
highestBlockFromMainStorage, err := storage.MainStorage.GetMaxBlockNumber(poller.chainId)
if err != nil {
log.Error().Err(err).Msg("Error getting last block in main storage")
} else if highestBlockFromMainStorage != nil && highestBlockFromMainStorage.Sign() > 0 {
if highestBlockFromMainStorage.Cmp(fromBlock) > 0 {
log.Debug().Msgf("Main storage block %s is higher than configured start block %s", highestBlockFromMainStorage.String(), fromBlock.String())
lastPolledBlock = highestBlockFromMainStorage
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling for storage initialization

The error from GetMaxBlockNumber is logged but not propagated. This could lead to silent failures during initialization.

Consider returning an error from the constructor or at least setting a flag to indicate initialization issues:

 highestBlockFromMainStorage, err := storage.MainStorage.GetMaxBlockNumber(poller.chainId)
 if err != nil {
-	log.Error().Err(err).Msg("Error getting last block in main storage")
+	log.Error().Err(err).Msg("Error getting last block in main storage, using configured start block")
+	// Consider adding a health check flag
+	// p.initializationWarning = true
 } else if highestBlockFromMainStorage != nil && highestBlockFromMainStorage.Sign() > 0 {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
highestBlockFromMainStorage, err := storage.MainStorage.GetMaxBlockNumber(poller.chainId)
if err != nil {
log.Error().Err(err).Msg("Error getting last block in main storage")
} else if highestBlockFromMainStorage != nil && highestBlockFromMainStorage.Sign() > 0 {
if highestBlockFromMainStorage.Cmp(fromBlock) > 0 {
log.Debug().Msgf("Main storage block %s is higher than configured start block %s", highestBlockFromMainStorage.String(), fromBlock.String())
lastPolledBlock = highestBlockFromMainStorage
}
}
highestBlockFromMainStorage, err := storage.MainStorage.GetMaxBlockNumber(poller.chainId)
if err != nil {
log.Error().Err(err).Msg("Error getting last block in main storage, using configured start block")
// Consider adding a health check flag
// p.initializationWarning = true
} else if highestBlockFromMainStorage != nil && highestBlockFromMainStorage.Sign() > 0 {
if highestBlockFromMainStorage.Cmp(fromBlock) > 0 {
log.Debug().Msgf(
"Main storage block %s is higher than configured start block %s",
highestBlockFromMainStorage.String(),
fromBlock.String(),
)
lastPolledBlock = highestBlockFromMainStorage
}
}
🤖 Prompt for AI Agents
internal/orchestrator/poller.go around lines 101 to 109: the call to
storage.MainStorage.GetMaxBlockNumber logs an error but swallows it, risking
silent initialization failures; modify the initialization flow to propagate this
error (return it from the constructor) or set an explicit initialization failure
flag on the poller and halt further setup when err != nil, ensuring callers can
detect and handle the failure; update callers to handle the returned error or
check the flag and abort startup accordingly, and keep the existing log but add
context before returning/setting the flag.

Comment on lines +116 to 119
GetStagingDataBlockRange(chainId *big.Int) (minBlockNumber *big.Int, maxBlockNumber *big.Int, err error)

// Block failures
GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
StoreBlockFailures(failures []common.BlockFailure) error
DeleteBlockFailures(failures []common.BlockFailure) error
DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Interface evolution: verify all drivers implement the new methods

Ensure every staging driver (postgres, clickhouse, badger) now implements GetStagingDataBlockRange and DeleteStagingDataOlderThan to prevent runtime type assertion failures.


🏁 Script executed:

#!/bin/bash
# Verify IStagingStorage implementations expose required methods
rg -nP 'type\s+\w+\s+struct\s*\{|\bfunc\s+\(\s*\*?\w+\s*\)\s+GetStagingDataBlockRange\b|\bfunc\s+\(\s*\*?\w+\s*\)\s+DeleteStagingDataOlderThan\b' -C2 internal/storage

Length of output: 9168


🏁 Script executed:

#!/bin/bash
# Display the IStagingStorage interface to verify new methods
rg -n -C5 'type IStagingStorage' internal/storage/connector.go

Length of output: 465


Implement IStagingStorage methods in all staging connectors

  • PostgresConnector (internal/storage/postgres.go): add GetStagingDataBlockRange and DeleteStagingDataOlderThan
  • ClickHouseConnector (internal/storage/clickhouse.go): add GetStagingDataBlockRange and DeleteStagingDataOlderThan
  • BadgerConnector (internal/storage/badger.go): add GetStagingDataBlockRange and DeleteStagingDataOlderThan

Ensure each method’s logic correctly reflects the underlying store to prevent runtime panics.

🤖 Prompt for AI Agents
In internal/storage/connector.go around lines 116 to 119, the IStagingStorage
interface was extended with GetStagingDataBlockRange and
DeleteStagingDataOlderThan but the PostgresConnector, ClickHouseConnector, and
BadgerConnector implementations are missing those methods; implement
GetStagingDataBlockRange(chainId *big.Int) (minBlockNumber *big.Int,
maxBlockNumber *big.Int, err error) and DeleteStagingDataOlderThan(chainId
*big.Int, blockNumber *big.Int) error in internal/storage/postgres.go,
internal/storage/clickhouse.go, and internal/storage/badger.go, ensuring each
method uses the concrete storage APIs (SQL queries for Postgres, appropriate
ClickHouse queries, and Badger key iteration/metadata for Badger), returns
meaningful errors instead of panicking, safely handles nil chainId or nil
results, converts numeric block values to *big.Int consistently, and includes
bounds checks so empty stores return (nil, nil, nil) or a clear error rather
than causing runtime panics.

Comment on lines 205 to 265
// processChunk
func (w *Worker) processChunk(ctx context.Context, chunk []*big.Int, fetchFunc func(context.Context, []*big.Int) []rpc.GetFullBlockResult) (success []rpc.GetFullBlockResult, failed []rpc.GetFullBlockResult) {
select {
case <-ctx.Done():
// Return error results for all blocks if context cancelled
var results []rpc.GetFullBlockResult
for _, block := range chunk {
results = append(results, rpc.GetFullBlockResult{
BlockNumber: block,
Error: fmt.Errorf("context cancelled"),
})
}
return nil, results
default:
}

// Fetch the chunk
results := fetchFunc(ctx, chunk)

// If we got all results, return them
if len(results) == len(chunk) {
allSuccess := true
for _, r := range results {
if r.Error != nil {
allSuccess = false
break
}
}
for _, r := range rightResults {
if r.BlockNumber != nil {
successMap[r.BlockNumber.String()] = r
if allSuccess {
return results, nil
}
}

// Separate successful and failed
successMap := make(map[string]rpc.GetFullBlockResult)

for i, result := range results {
if i < len(chunk) {
if result.Error == nil {
successMap[chunk[i].String()] = result
}
}
}

// Build final results in original order
var finalResults []rpc.GetFullBlockResult
var failedResults []rpc.GetFullBlockResult
for _, block := range chunk {
if result, ok := successMap[block.String()]; ok {
finalResults = append(finalResults, result)
} else {
// Add error result for missing blocks
finalResults = append(finalResults, rpc.GetFullBlockResult{
// This should not happen as we have retried all failed blocks
failedResults = append(failedResults, rpc.GetFullBlockResult{
BlockNumber: block,
Error: fmt.Errorf("failed to fetch block"),
})
}
}

return finalResults
return finalResults, failedResults
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Non-retry path loses original error information

processChunk returns generic “failed to fetch block” errors, discarding specific RPC errors. Keep original error details.

-  // Separate successful and failed
-  successMap := make(map[string]rpc.GetFullBlockResult)
+  // Separate successful and failed
+  successMap := make(map[string]rpc.GetFullBlockResult)
+  errorMap := make(map[string]rpc.GetFullBlockResult)
@@
-      if result.Error == nil {
-        successMap[chunk[i].String()] = result
-      }
+      if result.Error == nil {
+        successMap[chunk[i].String()] = result
+      } else {
+        errorMap[chunk[i].String()] = result
+      }
@@
-  var failedResults []rpc.GetFullBlockResult
+  var failedResults []rpc.GetFullBlockResult
   for _, block := range chunk {
     if result, ok := successMap[block.String()]; ok {
       finalResults = append(finalResults, result)
     } else {
-      // This should not happen as we have retried all failed blocks
-      failedResults = append(failedResults, rpc.GetFullBlockResult{
-        BlockNumber: block,
-        Error:       fmt.Errorf("failed to fetch block"),
-      })
+      if fr, ok := errorMap[block.String()]; ok && fr.Error != nil {
+        failedResults = append(failedResults, fr)
+      } else {
+        failedResults = append(failedResults, rpc.GetFullBlockResult{
+          BlockNumber: block,
+          Error:       fmt.Errorf("failed to fetch block"),
+        })
+      }
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// processChunk
func (w *Worker) processChunk(ctx context.Context, chunk []*big.Int, fetchFunc func(context.Context, []*big.Int) []rpc.GetFullBlockResult) (success []rpc.GetFullBlockResult, failed []rpc.GetFullBlockResult) {
select {
case <-ctx.Done():
// Return error results for all blocks if context cancelled
var results []rpc.GetFullBlockResult
for _, block := range chunk {
results = append(results, rpc.GetFullBlockResult{
BlockNumber: block,
Error: fmt.Errorf("context cancelled"),
})
}
return nil, results
default:
}
// Fetch the chunk
results := fetchFunc(ctx, chunk)
// If we got all results, return them
if len(results) == len(chunk) {
allSuccess := true
for _, r := range results {
if r.Error != nil {
allSuccess = false
break
}
}
for _, r := range rightResults {
if r.BlockNumber != nil {
successMap[r.BlockNumber.String()] = r
if allSuccess {
return results, nil
}
}
// Separate successful and failed
successMap := make(map[string]rpc.GetFullBlockResult)
for i, result := range results {
if i < len(chunk) {
if result.Error == nil {
successMap[chunk[i].String()] = result
}
}
}
// Build final results in original order
var finalResults []rpc.GetFullBlockResult
var failedResults []rpc.GetFullBlockResult
for _, block := range chunk {
if result, ok := successMap[block.String()]; ok {
finalResults = append(finalResults, result)
} else {
// Add error result for missing blocks
finalResults = append(finalResults, rpc.GetFullBlockResult{
// This should not happen as we have retried all failed blocks
failedResults = append(failedResults, rpc.GetFullBlockResult{
BlockNumber: block,
Error: fmt.Errorf("failed to fetch block"),
})
}
}
return finalResults
return finalResults, failedResults
}
// Separate successful and failed
successMap := make(map[string]rpc.GetFullBlockResult)
errorMap := make(map[string]rpc.GetFullBlockResult)
for i, result := range results {
if i < len(chunk) {
if result.Error == nil {
successMap[chunk[i].String()] = result
} else {
errorMap[chunk[i].String()] = result
}
}
}
// Build final results in original order
var finalResults []rpc.GetFullBlockResult
var failedResults []rpc.GetFullBlockResult
for _, block := range chunk {
if result, ok := successMap[block.String()]; ok {
finalResults = append(finalResults, result)
} else {
if fr, ok := errorMap[block.String()]; ok && fr.Error != nil {
// Preserve the original RPC error
failedResults = append(failedResults, fr)
} else {
// Fallback to generic error if none recorded
failedResults = append(failedResults, rpc.GetFullBlockResult{
BlockNumber: block,
Error: fmt.Errorf("failed to fetch block"),
})
}
}
}
return finalResults, failedResults
🤖 Prompt for AI Agents
internal/worker/worker.go around lines 205 to 265: the current implementation
discards original RPC error details and replaces them with a generic "failed to
fetch block" when assembling failed results; instead, preserve and return the
original rpc.GetFullBlockResult errors by building maps for both successes and
failures keyed by block.String() using the returned results (or the
corresponding result by index when available), and when a block is missing from
results fall back to constructing a rpc.GetFullBlockResult with the block number
and a clear contextual error; ensure finalResults keeps original successful
results and failedResults contains the original rpc.GetFullBlockResult entries
(with their Error fields) rather than a generic error string.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/orchestrator/committer_test.go (1)

72-78: Mixed mocking frameworks (EXPECT vs testify mock). Standardize to one.

You use testify’s AssertNotCalled but gomock-style EXPECT elsewhere. Pick one; below is testify-only.

   committer.cleanupProcessedStagingBlocks()
   mockStagingStorage.AssertNotCalled(t, "DeleteStagingDataOlderThan", mock.Anything, mock.Anything)

   committer.lastPublishedBlock.Store(90)
-  mockRPC.EXPECT().GetChainID().Return(chainID)
-  mockStagingStorage.EXPECT().DeleteStagingDataOlderThan(chainID, big.NewInt(90)).Return(nil)
+  mockRPC.On("GetChainID").Return(chainID).Once()
+  // Match big.Int by value to avoid pointer inequality flakiness.
+  matchBig := func(v *big.Int) bool { return v != nil && v.Cmp(big.NewInt(90)) == 0 }
+  mockStagingStorage.
+    On("DeleteStagingDataOlderThan", chainID, mock.MatchedBy(matchBig)).
+    Return(nil).Once()
   committer.cleanupProcessedStagingBlocks()
+  mockRPC.AssertExpectations(t)
+  mockStagingStorage.AssertExpectations(t)

Alternative: switch fully to gomock (use gomock.Controller, gomock.Any()) and drop testify/mock assertions.

🧹 Nitpick comments (3)
internal/orchestrator/poller_test.go (1)

7-13: Replace placeholder with minimal, Worker-driven Poller tests (or gate with a build tag).

Placeholders erode coverage. Either:

  • add a minimal table-driven test using the new Worker–Poller path, or
  • gate the file with a build tag (e.g., //go:build integration) if the tests are temporarily integration-only.

I can scaffold a minimal Poller–Worker test harness against your new API if you share Poller/Worker constructors.

internal/orchestrator/committer_test.go (2)

30-53: Remove “Removed - test needs to be updated…” comment noise.

These comments add churn and can trigger format diffs. Delete them.

-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
-
-// Removed - test needs to be updated for new implementation
+// (intentionally left blank)

80-81: Don’t leave empty tests.

Skip with a reason or implement the behavior.

 func TestStartCommitter(t *testing.T) {
-}
+  t.Skip("TODO: implement start/stop lifecycle test after Worker–Poller wiring stabilizes")
+}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 37a24cb and 038fb00.

📒 Files selected for processing (4)
  • internal/orchestrator/committer_test.go (2 hunks)
  • internal/orchestrator/poller_test.go (1 hunks)
  • internal/orchestrator/reorg_handler_test.go (1 hunks)
  • test/mocks/MockIStagingStorage.go (2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Go Code Format Check
internal/orchestrator/reorg_handler_test.go

[error] 1-1: Go fmt formatting check failed. Run 'gofmt -w .' to fix formatting.

internal/orchestrator/poller_test.go

[error] 1-1: Go fmt formatting check failed. Run 'gofmt -w .' to fix formatting.

🔇 Additional comments (6)
internal/orchestrator/reorg_handler_test.go (1)

1-13: Remove go fmt suggestion; code is already gofmt-compliant.

Likely an incorrect or invalid review comment.

test/mocks/MockIStagingStorage.go (3)

151-177: Typed expecter for GetStagingData looks good.

Signatures and Return/RunAndReturn plumbing align with testify/mock patterns.


218-244: Typed helper for GetStagingDataBlockRange LGTM.

Parameter ordering and Return helpers match the method signature.


121-149: Ignore BlockFailure API note; prefer flexible QueryFilter matching in mocks

  • The change to return []common.BlockData and updated panic message is correct.
  • In tests, matching storage.QueryFilter by strict value can be brittle—use mock.MatchedBy or mock.Anything for more resilient matching.

Likely an incorrect or invalid review comment.

internal/orchestrator/poller_test.go (1)

7-13: Fix formatting failures

CI shows formatting errors but gofmt isn’t available in this environment—please run:

gofmt -l .
goimports -l .
gci write --skip-generated --custom-order -s standard,default,company --skip-vendor .
gofmt -s -w .

locally to format the repo and unblock CI.

internal/orchestrator/committer_test.go (1)

22-24: Avoid zero-value Poller; construct or mock via factory.

If Poller has required deps, &Poller{} risks nil derefs inside NewCommitter. Prefer a real constructor or a dedicated mock/fake.

If a constructor exists (e.g., NewPoller or NewPollerForTest), switch to it; otherwise confirm NewCommitter tolerates a nil/zero Poller.

Also applies to: 64-66

Comment on lines +7 to +8
// All tests removed - need to be updated for new implementation
// The tests were failing due to missing mock expectations after refactoring
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Don’t land with tests removed—recreate ReorgHandler tests now.

Removing all tests guts coverage for a high‑risk area. Add rewritten tests in this PR (or block merge). Target cases:

  • Uses stored last reorg-checked block when >0; else falls back to FromBlock.
  • Storage retrieval error path falls back to FromBlock.
  • Reorg detected vs no-reorg paths with new Worker/Poller wiring.
  • Edge cases around off-by-one at boundary blocks.

Run to confirm no remaining Reorg tests elsewhere and to locate code paths to cover:


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Any Go tests mentioning "reorg"?
rg -nP --type=go -g '**/*_test.go' -C2 '(?i)\breorg\b' || true
# Any tests named for ReorgHandler?
rg -nP --type=go -g '**/*_test.go' '\bTest(Reorg|ReorgHandler)\b' || true
# Validate code paths you likely need to exercise
rg -nP --type=go '(?i)\b(last\s*reorg[-_ ]?checked|FromBlock|ForceFromBlock|reorg\s*handler)\b' || true

Length of output: 7322


Add comprehensive ReorgHandler tests in internal/orchestrator/reorg_handler_test.go before merging.

Target cases:

  • Stored last-checked block > 0 vs fallback to configured FromBlock
  • Storage retrieval error path falling back to FromBlock
  • Reorg detected vs no-reorg paths with the new Worker/Poller wiring
  • Edge cases around off-by-one at boundary blocks
🤖 Prompt for AI Agents
In internal/orchestrator/reorg_handler_test.go around lines 7 to 8, the test
suite was removed and must be rewritten to cover the new ReorgHandler
implementation: add unit tests that (1) verify when storage returns a
last-checked block > 0 the handler uses that value, and when storage returns
nothing it falls back to configured FromBlock, (2) simulate storage retrieval
errors and assert fallback to FromBlock, (3) exercise both reorg-detected and
no-reorg flows using the new Worker/Poller wiring by mocking the poller/worker
interfaces and expectations, and (4) include boundary tests for off-by-one
behavior at block edges; for each test, set up minimal mocks with explicit
expectations, invoke the handler, and assert the resulting start block and
reorg/no-reorg outcomes.

Comment on lines 10 to 13
func TestReorgHandlerPlaceholder(t *testing.T) {
// Placeholder test to keep the test file valid
t.Skip("Reorg handler tests need to be rewritten for new implementation")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace placeholder skip with concrete subtest scaffolding.

Provide actionable TODOs per new behavior so the suite isn’t silently skipped.

-func TestReorgHandlerPlaceholder(t *testing.T) {
-	// Placeholder test to keep the test file valid
-	t.Skip("Reorg handler tests need to be rewritten for new implementation")
-}
+func TestReorgHandler_ReorgStartPoint(t *testing.T) {
+	t.Run("uses stored last reorg-checked block when positive", func(t *testing.T) {
+		t.Skip("TODO: inject mock store returning >0 and assert start point")
+	})
+	t.Run("falls back to FromBlock when stored invalid or retrieval fails", func(t *testing.T) {
+		t.Skip("TODO: mock store error/zero and assert fallback to config FromBlock")
+	})
+}
+
+func TestReorgHandler_ReorgDecisionFlow(t *testing.T) {
+	t.Run("no reorg detected -> proceeds without rollback", func(t *testing.T) {
+		t.Skip("TODO: wire Worker/Poller mocks to return stable chain; assert no rollback")
+	})
+	t.Run("reorg detected -> triggers rollback/re-validate via Worker", func(t *testing.T) {
+		t.Skip("TODO: simulate reorg; assert correct calls to Worker/Validator")
+	})
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestReorgHandlerPlaceholder(t *testing.T) {
// Placeholder test to keep the test file valid
t.Skip("Reorg handler tests need to be rewritten for new implementation")
}
func TestReorgHandler_ReorgStartPoint(t *testing.T) {
t.Run("uses stored last reorg-checked block when positive", func(t *testing.T) {
t.Skip("TODO: inject mock store returning >0 and assert start point")
})
t.Run("falls back to FromBlock when stored invalid or retrieval fails", func(t *testing.T) {
t.Skip("TODO: mock store error/zero and assert fallback to config FromBlock")
})
}
func TestReorgHandler_ReorgDecisionFlow(t *testing.T) {
t.Run("no reorg detected -> proceeds without rollback", func(t *testing.T) {
t.Skip("TODO: wire Worker/Poller mocks to return stable chain; assert no rollback")
})
t.Run("reorg detected -> triggers rollback/re-validate via Worker", func(t *testing.T) {
t.Skip("TODO: simulate reorg; assert correct calls to Worker/Validator")
})
}
🤖 Prompt for AI Agents
In internal/orchestrator/reorg_handler_test.go around lines 10 to 13, replace
the single t.Skip placeholder with concrete subtest scaffolding: create named
t.Run subtests for each expected reorg behavior (e.g., "handles-simple-reorg",
"ignores-nonreorg-events", "retries-on-transient-failure") and inside each
subtest add TODO comments describing the specific setup, inputs, and assertions
to implement (mocking of dependencies, expected state changes, and
error-handling paths); ensure tests currently disabled by Skip are present as
failing TODOs or t.Skipf with a clear per-subtest message so the suite no longer
silently skips all tests and provides actionable next steps for each behavior to
be implemented.

Comment on lines +179 to 216
// GetStagingDataBlockRange provides a mock function with given fields: chainId
func (_m *MockIStagingStorage) GetStagingDataBlockRange(chainId *big.Int) (*big.Int, *big.Int, error) {
ret := _m.Called(chainId)

if len(ret) == 0 {
panic("no return value specified for GetLastStagedBlockNumber")
panic("no return value specified for GetStagingDataBlockRange")
}

var r0 *big.Int
var r1 error
if rf, ok := ret.Get(0).(func(*big.Int, *big.Int, *big.Int) (*big.Int, error)); ok {
return rf(chainId, rangeStart, rangeEnd)
var r1 *big.Int
var r2 error
if rf, ok := ret.Get(0).(func(*big.Int) (*big.Int, *big.Int, error)); ok {
return rf(chainId)
}
if rf, ok := ret.Get(0).(func(*big.Int, *big.Int, *big.Int) *big.Int); ok {
r0 = rf(chainId, rangeStart, rangeEnd)
if rf, ok := ret.Get(0).(func(*big.Int) *big.Int); ok {
r0 = rf(chainId)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*big.Int)
}
}

if rf, ok := ret.Get(1).(func(*big.Int, *big.Int, *big.Int) error); ok {
r1 = rf(chainId, rangeStart, rangeEnd)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// MockIStagingStorage_GetLastStagedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastStagedBlockNumber'
type MockIStagingStorage_GetLastStagedBlockNumber_Call struct {
*mock.Call
}

// GetLastStagedBlockNumber is a helper method to define mock.On call
// - chainId *big.Int
// - rangeStart *big.Int
// - rangeEnd *big.Int
func (_e *MockIStagingStorage_Expecter) GetLastStagedBlockNumber(chainId interface{}, rangeStart interface{}, rangeEnd interface{}) *MockIStagingStorage_GetLastStagedBlockNumber_Call {
return &MockIStagingStorage_GetLastStagedBlockNumber_Call{Call: _e.mock.On("GetLastStagedBlockNumber", chainId, rangeStart, rangeEnd)}
}

func (_c *MockIStagingStorage_GetLastStagedBlockNumber_Call) Run(run func(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int)) *MockIStagingStorage_GetLastStagedBlockNumber_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*big.Int), args[1].(*big.Int), args[2].(*big.Int))
})
return _c
}

func (_c *MockIStagingStorage_GetLastStagedBlockNumber_Call) Return(maxBlockNumber *big.Int, err error) *MockIStagingStorage_GetLastStagedBlockNumber_Call {
_c.Call.Return(maxBlockNumber, err)
return _c
}

func (_c *MockIStagingStorage_GetLastStagedBlockNumber_Call) RunAndReturn(run func(*big.Int, *big.Int, *big.Int) (*big.Int, error)) *MockIStagingStorage_GetLastStagedBlockNumber_Call {
_c.Call.Return(run)
return _c
}

// GetStagingData provides a mock function with given fields: qf
func (_m *MockIStagingStorage) GetStagingData(qf storage.QueryFilter) ([]common.BlockData, error) {
ret := _m.Called(qf)

if len(ret) == 0 {
panic("no return value specified for GetStagingData")
}

var r0 []common.BlockData
var r1 error
if rf, ok := ret.Get(0).(func(storage.QueryFilter) ([]common.BlockData, error)); ok {
return rf(qf)
}
if rf, ok := ret.Get(0).(func(storage.QueryFilter) []common.BlockData); ok {
r0 = rf(qf)
if rf, ok := ret.Get(1).(func(*big.Int) *big.Int); ok {
r1 = rf(chainId)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]common.BlockData)
if ret.Get(1) != nil {
r1 = ret.Get(1).(*big.Int)
}
}

if rf, ok := ret.Get(1).(func(storage.QueryFilter) error); ok {
r1 = rf(qf)
if rf, ok := ret.Get(2).(func(*big.Int) error); ok {
r2 = rf(chainId)
} else {
r1 = ret.Error(1)
r2 = ret.Error(2)
}

return r0, r1
return r0, r1, r2
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Range API addition: align nil-range semantics with consumers and backends.

  • Clarify expected behavior when no data exists (both min/max nil?) and ensure all consumers handle nil without deref.
  • Verify all IStagingStorage implementations provide this method and remove any last-staged-block usages.

Run to verify implementations and stale APIs:


🏁 Script executed:

#!/bin/bash
# Implementations/callers of the new method
rg -nP --type=go -C3 '\bGetStagingDataBlockRange\s*\('

# Stale "last staged" or similar APIs that should be gone
rg -nP --type=go -C2 '\b(GetLastStagedBlock|LastStaged)\b'

Length of output: 3847


Align nil-range semantics with consumers and backends

  • Define expected return (nil, nil, nil) when no data exists and update docs/tests and all callers to guard against nil dereferences.
  • Verified every connector implements GetStagingDataBlockRange and no legacy LastStaged APIs remain.
🤖 Prompt for AI Agents
In test/mocks/MockIStagingStorage.go around lines 179 to 216, the mock currently
panics when no return values are specified; change behaviour so that when no
data exists it returns (nil, nil, nil) instead of panicking. Replace the early
panic on len(ret) == 0 with a safe return of (nil, nil, nil), and ensure the
subsequent type assertions already gracefully handle nil entries (keep the
existing checks before type assertions). Update any tests that relied on the
panic to instead set explicit nil returns or assert the nil triple from the
mock, and add/adjust doc comments to state that absence of data yields (nil,
nil, nil).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
internal/orchestrator/poller.go (4)

145-151: Avoid busy-wait when there are no new blocks.

When ErrNoNewBlocks, the loop immediately continues and spins. Insert a small delay tied to triggerIntervalMs.

-          if err != nil {
-            if err != ErrNoNewBlocks {
-              log.Error().Err(err).Msg("Failed to get block range to poll")
-            }
-            continue
-          }
+          if err != nil {
+            if err != ErrNoNewBlocks {
+              log.Error().Err(err).Msg("Failed to get block range to poll")
+            } else {
+              // Avoid busy-waiting when chain tip hasn't advanced.
+              select {
+              case <-time.After(time.Duration(p.triggerIntervalMs) * time.Millisecond):
+              case <-pollCtx.Done():
+                return
+              }
+            }
+            continue
+          }

209-237: Fix wrong mutex and serialize Request to avoid races with concurrent calls.

lastRequestedBlock is updated while holding lastPolledBlockMutex, and concurrent Request() calls can trample each other or race with getNextBlockRange.

 func (p *Poller) Request(ctx context.Context, blockNumbers []*big.Int) []common.BlockData {
+  // Serialize request computations to avoid conflicting range math with pollers/other requests.
+  p.blockRangeMutex.Lock()
+  defer p.blockRangeMutex.Unlock()
   startBlock, endBlock := blockNumbers[0], blockNumbers[len(blockNumbers)-1]

   p.lastPolledBlockMutex.RLock()
   lastPolledBlock := new(big.Int).Set(p.lastPolledBlock)
   p.lastPolledBlockMutex.RUnlock()

   if startBlock.Cmp(lastPolledBlock) > 0 {
     log.Debug().Msgf("Requested block %s - %s is greater than last polled block %s, waiting for poller", startBlock.String(), endBlock.String(), lastPolledBlock.String())
     return nil
   }

   // If the requested end block exceeds, then truncate the block numbers list
   if endBlock.Cmp(lastPolledBlock) > 0 {
     lastPolledIndex := new(big.Int).Sub(lastPolledBlock, startBlock).Int64()
     blockNumbers = blockNumbers[:lastPolledIndex+1]
     log.Debug().Msgf("Truncated requested block range to %s - %s (last polled block: %s)", blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String(), lastPolledBlock.String())
   }

   blockData, highestBlockNumber := p.pollBlockData(ctx, blockNumbers)
   if len(blockData) == 0 || highestBlockNumber == nil {
     return nil
   }

-  p.lastPolledBlockMutex.Lock()
-  p.lastRequestedBlock = new(big.Int).Set(highestBlockNumber)
-  p.lastPolledBlockMutex.Unlock()
+  p.lastRequestedBlockMutex.Lock()
+  p.lastRequestedBlock = new(big.Int).Set(highestBlockNumber)
+  p.lastRequestedBlockMutex.Unlock()
   return blockData
 }
#!/bin/bash
set -euo pipefail
# Inspect assignments to lastRequestedBlock to ensure they're protected by lastRequestedBlockMutex.
rg -n -C2 '\blastRequestedBlock\s*=' --type go

# Find any potentially unsafe uses of pollUntilBlock.Sign() that could deref nil.
rg -n -C2 'pollUntilBlock\.Sign\(' --type go

260-271: Skip errored RPC results in conversion.

Include result.Error guard to avoid staging empty/invalid data.

 func (p *Poller) convertPollResultsToBlockData(results []rpc.GetFullBlockResult) []common.BlockData {
   blockData := make([]common.BlockData, 0, len(results))
   for _, result := range results {
+    if result.Error != nil {
+      log.Error().Err(result.Error).Msgf("Error in block result for block %s", result.BlockNumber)
+      continue
+    }
     blockData = append(blockData, common.BlockData{
       Block:        result.Data.Block,
       Logs:         result.Data.Logs,
       Transactions: result.Data.Transactions,
       Traces:       result.Data.Traces,
     })
   }
   return blockData
 }

294-335: Don’t hold blockRangeMutex across RPC; also normalize “invalid range” as ErrNoNewBlocks.

Holding the lock while calling GetLatestBlockNumber can stall other workers and risks deadlocks if RPC hangs. Also, returning nil,nil for invalid range forces a downstream poll error; prefer ErrNoNewBlocks here.

 func (p *Poller) getNextBlockRange(ctx context.Context) ([]*big.Int, error) {
-  p.blockRangeMutex.Lock()
-  defer p.blockRangeMutex.Unlock()
-
   latestBlock, err := p.rpc.GetLatestBlockNumber(ctx)
   if err != nil {
     return nil, err
   }
 
+  p.blockRangeMutex.Lock()
+  defer p.blockRangeMutex.Unlock()
+
   p.lastPendingFetchBlockMutex.Lock()
   lastPendingFetchBlock := new(big.Int).Set(p.lastPendingFetchBlock)
   p.lastPendingFetchBlockMutex.Unlock()
@@
   endBlock := p.getEndBlockForRange(startBlock, latestBlock)
   if startBlock.Cmp(endBlock) > 0 {
     log.Debug().Msgf("Invalid range: start block %s is greater than end block %s, skipping", startBlock, endBlock)
-    return nil, nil
+    return nil, ErrNoNewBlocks
   }
@@
   p.lastPendingFetchBlockMutex.Lock()
   p.lastPendingFetchBlock = new(big.Int).Set(endBlock)
   p.lastPendingFetchBlockMutex.Unlock()
 
   return p.createBlockNumbersForRange(startBlock, endBlock), nil
 }
🧹 Nitpick comments (3)
internal/orchestrator/poller.go (3)

69-91: Boundless init: consider setting pollUntilBlock to 0 to avoid nil checks.

This is optional given the fix below in reachedPollLimit, but it removes a whole class of nil pitfalls for “boundless” pollers.

Apply after constructing poller:

   poller := &Poller{
     chainId:           rpc.GetChainID(),
     rpc:               rpc,
     triggerIntervalMs: int64(triggerInterval),
     blocksPerPoll:     int64(blocksPerPoll),
     storage:           storage,
     parallelPollers:   config.Cfg.Poller.ParallelPollers,
   }
+  // For boundless polling, make "until" an explicit zero value instead of nil.
+  poller.pollUntilBlock = big.NewInt(0)

97-117: Initialization: surface storage readiness issues to callers.

Swallowing GetMaxBlockNumber errors can mask misconfigurations/downstream outages during startup. Either return an error from NewPoller (signature change) or record a health flag checked by Start() and exported via a readiness probe.


239-258: Minor logging nit.

Prefer structured logging to avoid fmt string churn and to ensure big.Ints are consistently stringified.

- log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], blockNumbers[len(blockNumbers)-1])
+ log.Debug().
+   Int("block_count", len(blockNumbers)).
+   Str("start_block", blockNumbers[0].String()).
+   Str("end_block", blockNumbers[len(blockNumbers)-1].String()).
+   Msg("Polling blocks")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 038fb00 and b7a399f.

📒 Files selected for processing (1)
  • internal/orchestrator/poller.go (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/orchestrator/poller.go (5)
internal/rpc/rpc.go (2)
  • IRPCClient (42-55)
  • GetFullBlockResult (18-22)
internal/worker/worker.go (1)
  • Worker (35-41)
internal/storage/connector.go (1)
  • IStorage (69-73)
internal/metrics/metrics.go (3)
  • PollerLastTriggeredBlock (59-62)
  • PolledBatchSize (52-55)
  • StagingInsertDuration (111-115)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
🔇 Additional comments (6)
internal/orchestrator/poller.go (6)

19-20: Defaults updated (50 blocks, 100ms): LGTM, but watch load.

The tighter 100ms tick can spike RPC/storage load. If this hits rate limits, consider making it backoff-aware or bumping the interval via config.


23-38: Poller state additions look good.

Clear separation of lastPolled/lastRequested/lastPending plus associated mutexes is the right move.


48-56: Option wiring is sound.

Nil-guard + dependency injection for Worker is correct.


158-161: Poll path wiring: LGTM.

Error path is logged and continues; success bubbles lastPolledBlock—consistent with Start loop.


188-207: Staging after polling is correct; metrics update is appropriate.

lastPolledBlock update under lock + gauge set is sound.


273-288: Staging path: LGTM.

No-op on empty slice, duration metric + debug timing are fine.

@jakeloo jakeloo merged commit c4b86db into main Aug 29, 2025
4 checks passed
@jakeloo jakeloo deleted the jl/fix-committer branch August 29, 2025 09:33
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/worker/worker.go (2)

126-135: Missing results aren’t retried

Looping over results length ignores tail blocks when fewer results are returned; those blocks never get retried. Iterate over the full chunk and mark missing indices as failed.

- for i, result := range results {
-   if i < len(chunk) {
-     if result.Error == nil {
-       successMap[chunk[i].String()] = result
-     } else {
-       failedBlocks = append(failedBlocks, chunk[i])
-     }
-   }
- }
+ for i := range chunk {
+   var r rpc.GetFullBlockResult
+   if i < len(results) {
+     r = results[i]
+   } else {
+     r = rpc.GetFullBlockResult{BlockNumber: chunk[i], Error: fmt.Errorf("missing result")}
+   }
+   if r.Error == nil {
+     successMap[chunk[i].String()] = r
+   } else {
+     failedBlocks = append(failedBlocks, chunk[i])
+   }
+ }

381-409: Fallback should fetch only missing blocks and merge, not overwrite

Current logic throws away partial successes and re-fetches entire ranges, amplifying load and hiding failures due to the mixing issue above.

- // Determine which source to use
- sourceType := SourceTypeRPC
- var results []rpc.GetFullBlockResult
- var errors []rpc.GetFullBlockResult
- success := false
- if w.shouldUseSource(ctx, w.staging, blockNumbers) {
-   sourceType = SourceTypeStaging
-   results, errors = w.processBatchWithRetry(ctx, blockNumbers, sourceType, w.fetchFromStaging)
-   success = len(results) > 0 && len(errors) == 0
- }
- if !success && w.shouldUseSource(ctx, w.archive, blockNumbers) {
-   sourceType = SourceTypeArchive
-   results, errors = w.processBatchWithRetry(ctx, blockNumbers, sourceType, w.fetchFromArchive)
-   success = len(results) > 0 && len(errors) == 0
- }
- if !success {
-   sourceType = SourceTypeRPC
-   results, errors = w.processBatchWithRetry(ctx, blockNumbers, sourceType, w.fetchFromRPC)
-   success = len(results) > 0 && len(errors) == 0
- }
+ // Attempt staged -> archive -> RPC, fetching only missing blocks and merging
+ resultsMap := make(map[string]rpc.GetFullBlockResult)
+ missing := append([]*big.Int(nil), blockNumbers...)
+ var results []rpc.GetFullBlockResult
+ var errors []rpc.GetFullBlockResult
+
+ try := func(st SourceType, fetch func(context.Context, []*big.Int) []rpc.GetFullBlockResult) {
+   if len(missing) == 0 {
+     return
+   }
+   ok := st == SourceTypeRPC || w.shouldUseSource(ctx, map[SourceType]source.ISource{
+     SourceTypeArchive: w.archive, SourceTypeStaging: w.staging,
+   }[st], missing)
+   if !ok {
+     return
+   }
+   succ, fail := w.processBatchWithRetry(ctx, missing, st, fetch)
+   for _, r := range succ {
+     resultsMap[r.BlockNumber.String()] = r
+   }
+   errors = fail
+   // Recompute missing from failures only
+   missing = nil
+   for _, fr := range fail {
+     missing = append(missing, fr.BlockNumber)
+   }
+ }
+ try(SourceTypeStaging, w.fetchFromStaging)
+ try(SourceTypeArchive, w.fetchFromArchive)
+ try(SourceTypeRPC, w.fetchFromRPC)
+
+ // Materialize results in requested order
+ results = results[:0]
+ for _, bn := range blockNumbers {
+   if r, ok := resultsMap[bn.String()]; ok {
+     results = append(results, r)
+   }
+ }
@@
- if !success {
-   for _, errResult := range errors {
+ if len(errors) > 0 {
+   for _, errResult := range errors {
     log.Error().Err(errResult.Error).Msgf("Error fetching block %s", errResult.BlockNumber.String())
-   }
- }
+   }
+ }

Also applies to: 411-416

♻️ Duplicate comments (11)
internal/orchestrator/reorg_handler_test.go (2)

7-8: Blocker: Don’t remove critical ReorgHandler tests—restore coverage now.

Eliminating these tests guts coverage for a high‑risk path. Re‑add tests in this PR before merge. Cover at least:

  • Uses stored last reorg-checked block (>0) vs fallback to FromBlock
  • Storage retrieval error → fallback to FromBlock
  • Reorg detected vs no-reorg with new Worker/Poller wiring
  • Boundary off‑by‑one at block edges

Run to confirm no remaining ReorgHandler/reorg tests elsewhere:

#!/bin/bash
set -euo pipefail
rg -nP --type=go -g '**/*_test.go' -C2 '(?i)\breorg(handler)?\b' || true
rg -nP --type=go '(?i)\b(last\s*reorg[-_ ]?checked|FromBlock|reorg\s*handler)\b' || true

10-12: Replace placeholder Skip with concrete subtest scaffolding.

Keep the file compiling but make next steps explicit and actionable.

-func TestReorgHandlerPlaceholder(t *testing.T) {
-	// Placeholder test to keep the test file valid
-	t.Skip("Reorg handler tests need to be rewritten for new implementation")
-}
+func TestReorgHandler_ReorgStartPoint(t *testing.T) {
+	t.Run("uses stored last reorg-checked block when >0", func(t *testing.T) {
+		t.Skip("TODO: inject mock store returning >0; assert start block used")
+	})
+	t.Run("falls back to FromBlock when store returns 0 or error", func(t *testing.T) {
+		t.Skip("TODO: mock store error/zero; assert fallback to config FromBlock")
+	})
+}
+
+func TestReorgHandler_ReorgDecisionFlow(t *testing.T) {
+	t.Run("no reorg detected -> no rollback", func(t *testing.T) {
+		t.Skip("TODO: wire Worker/Poller to stable chain; assert no rollback/re-validate")
+	})
+	t.Run("reorg detected -> triggers rollback/re-validate", func(t *testing.T) {
+		t.Skip("TODO: simulate fork; assert correct rollback and re-validate calls")
+	})
+	t.Run("boundary off-by-one at edges", func(t *testing.T) {
+		t.Skip("TODO: assert inclusive/exclusive bounds at start/end blocks")
+	})
+}
internal/worker/worker.go (4)

47-56: Good: rpcChunkSize guarded against zero

This addresses the SliceToChunks zero-size issue. Nice.


136-211: Failures mixed into successMap; wrong success detection and no error propagation

Errorful retries are inserted into successMap and later treated as successes; errors never reach failedResults, breaking fallback decisions (success becomes true with hidden failures).

Refactor to keep separate success and error maps, merge recursive results properly, and preserve original errors:

- // If only one block failed, retry once more
+ // If only one block failed, retry once more
   if len(failedBlocks) == 1 {
     retryResults := fetchFunc(ctx, failedBlocks)
     if len(retryResults) > 0 {
-      if retryResults[0].Error == nil {
-        successMap[failedBlocks[0].String()] = retryResults[0]
-      } else {
-        // Keep the error result
-        successMap[failedBlocks[0].String()] = rpc.GetFullBlockResult{
-          BlockNumber: failedBlocks[0],
-          Error:       retryResults[0].Error,
-        }
-      }
+      if retryResults[0].Error == nil {
+        successMap[failedBlocks[0].String()] = retryResults[0]
+      } else {
+        // Track failure; don't pollute successMap
+        errorMap := make(map[string]rpc.GetFullBlockResult)
+        errorMap[failedBlocks[0].String()] = retryResults[0]
+        // Build final results below will use errorMap
+        _ = errorMap
+      }
     }
   } else if len(failedBlocks) > 1 {
@@
-    var rwg sync.WaitGroup
-    var rwgMutex sync.Mutex
+    var rwg sync.WaitGroup
+    var rwgMutex sync.Mutex
+    errorMap := make(map[string]rpc.GetFullBlockResult)
@@
-      leftResults, _ := w.processChunkWithRetry(ctx, leftChunk, fetchFunc)
-      // Add results to map
-      for _, r := range leftResults {
+      leftSuccess, leftFailed := w.processChunkWithRetry(ctx, leftChunk, fetchFunc)
+      for _, r := range leftSuccess {
         if r.BlockNumber != nil {
           rwgMutex.Lock()
           successMap[r.BlockNumber.String()] = r
           rwgMutex.Unlock()
         }
       }
+      for _, r := range leftFailed {
+        if r.BlockNumber != nil {
+          rwgMutex.Lock()
+          errorMap[r.BlockNumber.String()] = r
+          rwgMutex.Unlock()
+        }
+      }
@@
-      rightResults, _ := w.processChunkWithRetry(ctx, rightChunk, fetchFunc)
-      // Add results to map
-      for _, r := range rightResults {
+      rightSuccess, rightFailed := w.processChunkWithRetry(ctx, rightChunk, fetchFunc)
+      for _, r := range rightSuccess {
         if r.BlockNumber != nil {
           rwgMutex.Lock()
           successMap[r.BlockNumber.String()] = r
           rwgMutex.Unlock()
         }
       }
+      for _, r := range rightFailed {
+        if r.BlockNumber != nil {
+          rwgMutex.Lock()
+          errorMap[r.BlockNumber.String()] = r
+          rwgMutex.Unlock()
+        }
+      }
@@
- // Build final results in original order
+ // Build final results in original order
   var finalResults []rpc.GetFullBlockResult
   var failedResults []rpc.GetFullBlockResult
   for _, block := range chunk {
     if result, ok := successMap[block.String()]; ok {
       finalResults = append(finalResults, result)
     } else {
-      // This should not happen as we have retried all failed blocks
-      failedResults = append(failedResults, rpc.GetFullBlockResult{
-        BlockNumber: block,
-        Error:       fmt.Errorf("failed to fetch block"),
-      })
+      // Prefer original failure if known
+      if em, ok := errorMap[block.String()]; ok {
+        failedResults = append(failedResults, em)
+      } else {
+        failedResults = append(failedResults, rpc.GetFullBlockResult{
+          BlockNumber: block,
+          Error:       fmt.Errorf("failed to fetch block"),
+        })
+      }
     }
   }

Note: define errorMap before single-retry branch and reuse it across paths.


405-409: Good: no sourceType shadowing on RPC fallback

Assignment (not :=) avoids the earlier logging mismatch.


214-273: Non-retry path drops error details and misclassifies failures

processChunk should also preserve error details and return them via failed, not embed them in success results.

- successMap := make(map[string]rpc.GetFullBlockResult)
+ successMap := make(map[string]rpc.GetFullBlockResult)
+ errorMap := make(map[string]rpc.GetFullBlockResult)
@@
- if result.Error == nil {
-   successMap[chunk[i].String()] = result
- }
+ if result.Error == nil {
+   successMap[chunk[i].String()] = result
+ } else {
+   errorMap[chunk[i].String()] = result
+ }
@@
- if result, ok := successMap[block.String()]; ok {
+ if result, ok := successMap[block.String()]; ok {
   finalResults = append(finalResults, result)
 } else {
-  // This should not happen as we have retried all failed blocks
-  failedResults = append(failedResults, rpc.GetFullBlockResult{
-    BlockNumber: block,
-    Error:       fmt.Errorf("failed to fetch block"),
-  })
+  if em, ok := errorMap[block.String()]; ok {
+    failedResults = append(failedResults, em)
+  } else {
+    failedResults = append(failedResults, rpc.GetFullBlockResult{
+      BlockNumber: block,
+      Error:       fmt.Errorf("failed to fetch block"),
+    })
+  }
 }
internal/orchestrator/poller.go (5)

290-292: Nil-deref in reachedPollLimit when pollUntilBlock is nil or zero

Guard nil and non-positive values.

-func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
-  return blockNumber == nil || (p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0)
-}
+func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
+  if blockNumber == nil {
+    return true
+  }
+  if p.pollUntilBlock == nil || p.pollUntilBlock.Sign() == 0 {
+    return false
+  }
+  return blockNumber.Cmp(p.pollUntilBlock) >= 0
+}

300-304: Good: moved RPC call outside blockRangeMutex

Reduces contention and avoids lock-holding during network I/O.


145-151: Add backoff on ErrNoNewBlocks to avoid busy wait

The loop spins every 100ms without sleeping when there’s nothing to poll.

 if err != nil {
   if err != ErrNoNewBlocks {
     log.Error().Err(err).Msg("Failed to get block range to poll")
-  }
-  continue
+  } else {
+    select {
+    case <-time.After(100 * time.Millisecond):
+    case <-pollCtx.Done():
+      return
+    }
+  }
+  continue
 }

209-237: Synchronize Request with range planning and update correct state

Concurrent Request calls can race with getNextBlockRange. Serialize with blockRangeMutex.

 func (p *Poller) Request(ctx context.Context, blockNumbers []*big.Int) []common.BlockData {
+  p.blockRangeMutex.Lock()
+  defer p.blockRangeMutex.Unlock()
   startBlock, endBlock := blockNumbers[0], blockNumbers[len(blockNumbers)-1]
   ...
   p.lastRequestedBlockMutex.Lock()
   p.lastRequestedBlock = new(big.Int).Set(highestBlockNumber)
   p.lastRequestedBlockMutex.Unlock()
   return blockData
 }

260-271: Skip errored RPC results when converting to BlockData

Avoid staging invalid data; log and continue.

 for _, result := range results {
+  if result.Error != nil {
+    log.Error().Err(result.Error).Msgf("Error in block result for block %s", result.BlockNumber)
+    continue
+  }
   blockData = append(blockData, common.BlockData{
     Block:        result.Data.Block,
     Logs:         result.Data.Logs,
     Transactions: result.Data.Transactions,
     Traces:       result.Data.Traces,
   })
 }
🧹 Nitpick comments (6)
internal/worker/worker.go (4)

80-87: Defensive nil checks for archive/staging fetchers

Calling through a nil source panics if callers ever bypass shouldUseSource. Add safe guards.

 func (w *Worker) fetchFromArchive(ctx context.Context, blocks []*big.Int) []rpc.GetFullBlockResult {
-  return w.archive.GetFullBlocks(ctx, blocks)
+  if w.archive == nil {
+    return nil
+  }
+  return w.archive.GetFullBlocks(ctx, blocks)
 }
 
 func (w *Worker) fetchFromStaging(ctx context.Context, blocks []*big.Int) []rpc.GetFullBlockResult {
-  return w.staging.GetFullBlocks(ctx, blocks)
+  if w.staging == nil {
+    return nil
+  }
+  return w.staging.GetFullBlocks(ctx, blocks)
 }

281-287: Nit: comment reflects archive-only but code covers staging too

Minor doc fix to avoid confusion.

- // Only enable chunk retrying for RPC
+ // Only enable chunk retrying for RPC
@@
- chunkSize := len(blocks) // Fetch all at once from archive
+ chunkSize := len(blocks) // Fetch all at once from non-RPC sources (archive/staging)

433-439: Fix logging: last requested block index

Use the last requested block, not blockNumbers[len(results)-1].

- Str("first_block", blockNumbers[0].String()).
- Str("last_block", blockNumbers[len(results)-1].String()).
+ Str("first_requested_block", blockNumbers[0].String()).
+ Str("last_requested_block", blockNumbers[len(blockNumbers)-1].String()).

448-458: Close all resources

Also close staging and RPC clients.

 func (w *Worker) Close() error {
   // Close archive if it exists
   if w.archive != nil {
     log.Debug().Msg("Closing archive connection")
     w.archive.Close()
   }
+  if w.staging != nil {
+    log.Debug().Msg("Closing staging connection")
+    w.staging.Close()
+  }
+  if w.rpc != nil {
+    log.Debug().Msg("Closing RPC client")
+    w.rpc.Close()
+  }
   log.Debug().Msg("Worker closed successfully")
   return nil
 }
internal/orchestrator/poller.go (2)

320-324: Return ErrNoNewBlocks for invalid ranges

Returning (nil, nil) causes noisy “no valid block data polled” errors downstream. Signal no work explicitly.

 if startBlock.Cmp(endBlock) > 0 {
   log.Debug().Msgf("Invalid range: start block %s is greater than end block %s, skipping", startBlock, endBlock)
-  return nil, nil
+  return nil, ErrNoNewBlocks
 }

101-109: Init error policy: consider surfacing storage probe failures

Swallowing GetMaxBlockNumber errors can hide boot issues. Either return an error from NewPoller or set an explicit degraded/health flag.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between b7a399f and b54cf9a.

📒 Files selected for processing (5)
  • internal/orchestrator/poller.go (7 hunks)
  • internal/orchestrator/poller_test.go (1 hunks)
  • internal/orchestrator/reorg_handler_test.go (1 hunks)
  • internal/storage/badger.go (8 hunks)
  • internal/worker/worker.go (10 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/storage/badger.go
🧰 Additional context used
🧬 Code graph analysis (2)
internal/worker/worker.go (2)
internal/source/source.go (1)
  • ISource (10-14)
internal/rpc/rpc.go (2)
  • IRPCClient (42-55)
  • GetFullBlockResult (18-22)
internal/orchestrator/poller.go (5)
internal/rpc/rpc.go (2)
  • IRPCClient (42-55)
  • GetFullBlockResult (18-22)
internal/worker/worker.go (1)
  • Worker (39-45)
internal/storage/connector.go (1)
  • IStorage (69-73)
internal/metrics/metrics.go (3)
  • PollerLastTriggeredBlock (59-62)
  • PolledBatchSize (52-55)
  • StagingInsertDuration (111-115)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)

Comment on lines +7 to +12
// All tests removed - need to be updated for new implementation
// The tests were failing due to missing mock expectations after refactoring

mockRPC.AssertExpectations(t)
func TestPollerPlaceholder(t *testing.T) {
// Placeholder test to keep the test file valid
t.Skip("Poller tests need to be rewritten for new implementation")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Restore minimal, non-skipped tests

Fully skipping the suite hides regressions. Add at least a smoke test that exercises Request gating (returns nil when requesting beyond lastPolledBlock) and a basic Poll flow via a fake Worker.

Example scaffold:

 func TestPollerPlaceholder(t *testing.T) {
-  // Placeholder test to keep the test file valid
-  t.Skip("Poller tests need to be rewritten for new implementation")
+  if testing.Short() {
+    t.Skip("Short mode")
+  }
+  // TODO: replace with real tests after refactor.
+  t.Skip("Poller tests need to be rewritten for new implementation")
 }

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants